package com.wqd.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            //1.kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.11.104:9092");
            //发送失败，失败的重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,5);
            //消息key的序列化器
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            //消息value的序列化器
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

            //2.生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

            //封装发送的消息
            //ProducerRecord<String,String> record = new ProducerRecord<String, String>("itwqd-topic","100001","你好 kafka");

            //3.发送消息
           // producer.send(record);
            //3.发送消息
            /**
             * ProducerRecord(String topic, K key, V value)
             * 参数1表示发送消息的主题，也叫队列名称
             * 参数2表示发送消息的key
             参数3表示发送的消息内容
             */
           //for (int i = 1; i <=5 ; i++) {
                if(i%2==0){
                    ProducerRecord record=new ProducerRecord("kafkastream-topic1","key"+i,"hello kafka");
                    producer.send(record);
                }else{
                    ProducerRecord record=new ProducerRecord("kafkastream-topic1","key"+i,"hello itcast");
                    producer.send(record);
                }

           // }

            //4.关闭消息通道，必须关闭，否则消息发送不成功
            producer.close();
        }
    }

}